package com.ml4ai.backend.services.actors.anys

import java.io.InputStream

import akka.actor._
import com.ml4ai.core.stack.mq.RabbitMQAgent

import scala.reflect.io.{File, Path => FPath}
import scala.util.Try

class FileTransfer2RabbitMQWorkActor(val id: String, val rabbitMQAgent: RabbitMQAgent, val queue: String, filep: String) extends Actor {

  var file: File = null
  var input: InputStream = null

  override def receive: Receive = {
    case "start" => {
      try {
        file = File(FPath(filep))
        input = file.inputStream()
        sender ! FileTaskAskContinue(id)
      } catch {
        case _ => {
          System.err.println("发生异常停止")
          sender ! FileTaskTellStop(id)
          close
        }
      }
    }
    case "step" => {
      try {
        val buff: Array[Byte] = Array.ofDim[Byte](2048 * 1024)
        val count: Int = input.read(buff)
        if (count > 0) {
          val write = buff.toList.take(count).toArray
          rabbitMQAgent.produce("", queue, write, true)
          sender ! FileTaskAskContinue(id)
        } else if (count <= -1) {
          rabbitMQAgent.produce("", queue, Array[Byte](), true)
          sender ! FileTaskTellComplete(id)
          close
        } else {
          sender ! FileTaskAskContinue(id)
        }
      } catch {
        case _ => {
          System.err.println("发生异常停止")
          sender ! FileTaskTellStop(id)
          close
        }
      }
    }
    case "stop" => {
      sender ! FileTaskTellStop(id)
      close
    }
  }

  def close(): Unit = {
    context stop self
    if (input != null)
      Try(input.close)
  }

}
